package com.meicai.data.engine.distribute;

import com.meicai.data.core.DataContext;
import com.meicai.data.engine.DataTransform;
import com.meicai.data.pojo.Item;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * MR任务中reduce的迭代器
 */
public class MRTransform extends DataTransform {

    protected String sep = "\t";//分隔符

    private Iterable<Text> values;

    private Context context;

    public MRTransform(Context context, Iterable<Text> values, boolean bTest) {
        super(bTest);
        this.values = values;
        this.context = context;
    }

    @Override
    public Iterable<String> fetchData() {
        List<String> list = new ArrayList<String>();
        for (Text v : values) {
            list.add(v.toString());
        }
        return list;
    }

    /**
     * @param dataContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean sinkData(DataContext dataContext) throws IOException, InterruptedException {
        for (Item item : dataContext.itemList) {
            	context.write(NullWritable.get(), new Text(item.toString()));
        }
        return true;
    }

    @Override
    public String getSep() {
        return sep;
    }
}
